{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### TFX Components\n",
    "\n",
    "This notebook shows how to create pipeline that uses TFX components:\n",
    "\n",
    "* CsvExampleGen\n",
    "* StatisticsGen\n",
    "* SchemaGen\n",
    "* ExampleValidator\n",
    "* Transform\n",
    "* Trainer\n",
    "* Evaluator"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Put your KFP cluster endpoint URL here if working from GCP notebooks (or local notebooks). ('https://xxxxx.notebooks.googleusercontent.com/')\n",
    "kfp_endpoint='https://XXXXX.notebooks.googleusercontent.com/'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "input_data_uri = 'gs://ml-pipeline-playground/tensorflow-tfx-repo/tfx/components/testdata/external/csv'\n",
    "\n",
    "#Only S3/GCS is supported for now.\n",
    "module_file = 'gs://ml-pipeline-playground/tensorflow-tfx-repo/v0.21.4/tfx/examples/chicago_taxi_pipeline/taxi_utils.py'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import kfp"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import json\n",
    "from kfp.components import load_component_from_url\n",
    "\n",
    "download_from_gcs_op = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/d013b8535666641ca5a5be6ce67e69e044bbf076/components/google-cloud/storage/download/component.yaml')\n",
    "\n",
    "CsvExampleGen_op    = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/8c545b62/components/tfx/ExampleGen/CsvExampleGen/component.yaml')\n",
    "StatisticsGen_op    = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/8c545b62/components/tfx/StatisticsGen/component.yaml')\n",
    "SchemaGen_op        = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/8c545b62/components/tfx/SchemaGen/component.yaml')\n",
    "ExampleValidator_op = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/8c545b62/components/tfx/ExampleValidator/component.yaml')\n",
    "Transform_op        = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/8c545b62/components/tfx/Transform/component.yaml')\n",
    "Trainer_op          = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/8c545b62/components/tfx/Trainer/component.yaml')\n",
    "Evaluator_op        = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/8c545b62/components/tfx/Evaluator/component.yaml')\n",
    "\n",
    "def tfx_pipeline(\n",
    "    input_data_uri,\n",
    "):\n",
    "    download_task = download_from_gcs_op(\n",
    "        input_data_uri,\n",
    "    )\n",
    "\n",
    "    examples_task = CsvExampleGen_op(\n",
    "        input=download_task.output,\n",
    "        input_config=json.dumps({\n",
    "            \"splits\": [\n",
    "                {'name': 'data', 'pattern': '*.csv'},\n",
    "            ]\n",
    "        }),\n",
    "        output_config=json.dumps({\n",
    "            \"splitConfig\": {\n",
    "                \"splits\": [\n",
    "                    {'name': 'train', 'hash_buckets': 2},\n",
    "                    {'name': 'eval', 'hash_buckets': 1},\n",
    "                ]\n",
    "            }\n",
    "        }),\n",
    "    )\n",
    "    \n",
    "    statistics_task = StatisticsGen_op(\n",
    "        examples=examples_task.outputs['examples'],\n",
    "    )\n",
    "    \n",
    "    schema_task = SchemaGen_op(\n",
    "        statistics=statistics_task.outputs['statistics'],\n",
    "    )\n",
    "\n",
    "    # Performs anomaly detection based on statistics and data schema.\n",
    "    validator_task = ExampleValidator_op(\n",
    "        statistics=statistics_task.outputs['statistics'],\n",
    "        schema=schema_task.outputs['schema'],\n",
    "    )\n",
    "\n",
    "    # Performs transformations and feature engineering in training and serving.\n",
    "    transform_task = Transform_op(\n",
    "        examples=examples_task.outputs['examples'],\n",
    "        schema=schema_task.outputs['schema'],\n",
    "        module_file=module_file,\n",
    "    )\n",
    "\n",
    "    trainer_task = Trainer_op(\n",
    "        module_file=module_file,\n",
    "        examples=transform_task.outputs['transformed_examples'],\n",
    "        schema=schema_task.outputs['schema'],\n",
    "        transform_graph=transform_task.outputs['transform_graph'],\n",
    "        train_args=json.dumps({'num_steps': 10000}),\n",
    "        eval_args=json.dumps({'num_steps': 5000}),\n",
    "    )\n",
    "\n",
    "    # Uses TFMA to compute a evaluation statistics over features of a model.\n",
    "    model_analyzer = Evaluator_op(\n",
    "        examples=examples_task.outputs['examples'],\n",
    "        model=trainer_task.outputs['model'],\n",
    "        feature_slicing_spec=json.dumps({\n",
    "            'specs': [\n",
    "                {'column_for_slicing': ['trip_start_hour']},\n",
    "            ],\n",
    "        }),\n",
    "    )\n",
    "\n",
    "\n",
    "kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(\n",
    "    tfx_pipeline,\n",
    "    arguments=dict(\n",
    "        input_data_uri=input_data_uri,\n",
    "    ),\n",
    ")"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}